package txpool

import (
	"hash/fnv"
	"os"
	"osiris/config"
	"osiris/dao"
	"osiris/dto"
	"osiris/logger"
	"osiris/ocrypto/ecdsa"
	"sync"

	"github.com/steakknife/bloomfilter"
)

// const
//
//	@param bfFile_suffix="local.bf.gz" 存储本地账户地址的BloomFilter的文件名的后缀名（前缀是config.NodeCodeName）
const (
	bfFile_suffix = "local.bf.gz"
)

type LocalTxPool struct {
	BaseTxPool

	// localBFName 判别是否为本地账户地址的Bloomfilter文件
	localBFName string

	// localBF Bloomfilter实例
	localBF *bloomfilter.Filter

	bfMutex *sync.RWMutex
}

// Init 从数据库中恢复本地交易池的queued（为什么pending不用：交易能进pending说明交易后的状态已经进本地的状态树了，交易也已经被广播）；根据local.bf.gz生成判别本地账户地址的BF
//
//	@receiver pool
//	@param queuedSize
//	@param pendingSize
func (pool *LocalTxPool) Init(queuedSize int, pendingSize int) {
	pool.queued = make(map[string]*TxDescendArray, 0)
	pool.pending = make(map[string][]*dto.TxData, 0)
	pool.queuedMutex = new(sync.RWMutex)
	pool.pendingMutex = new(sync.RWMutex)
	pool.bfMutex = new(sync.RWMutex)
	pool.queuedLimitSize = queuedSize
	pool.pendingLimitSize = pendingSize
	pool.localBFName = config.NodeCodeName + "_" + bfFile_suffix

	wg := new(sync.WaitGroup)
	wg.Add(2)
	go pool.initLocalBF(wg)
	go pool.initLocalQueued(wg)
	wg.Wait()
}

// Close 持久化存储本地交易池的BF,queued
//
//	@receiver pool
func (pool *LocalTxPool) Close() {
	wg := new(sync.WaitGroup)
	wg.Add(2)
	go pool.storeLocalBF(wg)
	go pool.storeLocalQueued(wg)
	wg.Wait()
}

// isLocalAddr 是否为本地账户地址
//
//	@receiver pool
//	@param addrStr
//	@return bool
func (pool *LocalTxPool) isLocalAddr(addrStr string) bool {
	pool.bfMutex.RLock()
	defer pool.bfMutex.RUnlock()

	if pool.localBF == nil {
		return false
	}

	hashFn := fnv.New64()
	hashFn.Write([]byte(addrStr))
	return pool.localBF.Contains(hashFn)
}

// initLocalBF 初始化判别本地账户地址的BF
//
//	@receiver pool
//	@param wg
//	@return error
func (pool *LocalTxPool) initLocalBF(wg *sync.WaitGroup) error {
	defer wg.Done()

	var err1, err2, err3 error
	//先尝试从文件读BF
	pool.localBF, _, err1 = bloomfilter.ReadFile(pool.localBFName)
	if err1 == nil {
		logger.Info(map[string]interface{}{"[txpool] [init Local BF] read local bloomfilter file": "succeed"})
		return nil
	}

	//没有文件就new一个BF
	logger.Warn(map[string]interface{}{"[txpool] [init Local BF] read local bloomfilter file": err1})
	pool.localBF, err2 = bloomfilter.NewOptimal(config.MaxLocalElements, config.ProbLocalCollide)
	if err2 != nil {
		logger.Warn(map[string]interface{}{"[txpool] [init Local BF] new local bloomfilter": err2})
		return err2
	}

	//把MYSQL中的本地账户地址加入BF
	localModels, err3 := dao.GetAllLocal()
	if err3 != nil {
		logger.Error(map[string]interface{}{"[txpool] [init Local BF] dao.GetAllLocal()": err3})
		return err3
	}

	for _, localModel := range localModels {
		addr := ecdsa.StrToAccountBytes(localModel.Addr)
		pool.AddLocalAddr(addr)
	}

	return nil
}

// ResetLocalBF 重置BF，会删掉原来的local.bf.gz，并new一个新的BF指针
//
//	@receiver pool
//	@return error
func (pool *LocalTxPool) ResetLocalBF() error {
	pool.bfMutex.Lock()
	defer pool.bfMutex.Unlock()

	err1 := os.Remove(pool.localBFName)
	if err1 != nil {
		logger.Error(map[string]interface{}{"[txpool] [Reset Local BF] os.Remove()": err1})
		return err1
	}

	var err2 error
	pool.localBF, err2 = bloomfilter.NewOptimal(config.MaxLocalElements, config.ProbLocalCollide)
	if err2 != nil {
		logger.Warn(map[string]interface{}{"[txpool] [Rest Local BF] new local bloomfilter": err2})
		return err2
	}

	return nil
}

func (pool *LocalTxPool) AddLocalAddr(addr []byte) {
	if pool.localBF == nil {
		return
	}

	hashFn := fnv.New64()
	hashFn.Write(addr)
	pool.bfMutex.Lock()
	pool.localBF.Add(hashFn)
	pool.bfMutex.Unlock()
}

func (pool *LocalTxPool) OnTxsConfirmed(txs []dto.TxData) {
	pool.pendingMutex.Lock()
	defer pool.pendingMutex.Unlock()

	for _, txData := range txs {
		fromAddr := txData.FromAddr
		tempTxs, exist := pool.pending[fromAddr]
		if !exist {
			continue
		}

		for index, tx := range tempTxs {
			if tx.TxHash != txData.TxHash {
				continue
			}

			//在pending中删除已确认的交易
			deletedTxs := append(tempTxs[:index], tempTxs[index+1:]...)
			pool.pending[fromAddr] = deletedTxs

			//通知交易确认订阅通道
			for _, ch := range pool.confirmSubChs {
				ch <- txData
			}

			//缓存本地的pledge交易
			if txData.TxType==dto.TX_PLEDGE{
				addconfirmedPledgeTx(txData)
			}
			
		}

	}
}

// initLocalQueued 从数据库中恢复本地交易池的queued，并将数据库中的queued删除
//
//	@receiver pool
//	@param wg
//	@return error
func (pool *LocalTxPool) initLocalQueued(wg *sync.WaitGroup) error {
	defer wg.Done()

	lcoalQueuedModels, err1 := dao.GetAllLocalQueued()
	if err1 != nil {
		logger.Error(map[string]interface{}{"[txpool] [init Local Queued] dao.GetAllLocalQueued()": err1})
		return err1
	}

	var err2 error
	for _, model := range lcoalQueuedModels {
		txData, tempErr := dto.FromLocalQueuedModel(model)
		if tempErr != nil {
			err2 = tempErr
			logger.Error(map[string]interface{}{"[txpool] [init Local Queued] from local queued model to txData": tempErr})
			continue
		}
		pool.inQueued(txData)
	}

	go dao.TruncateLocalQueued()

	return err2
}

// storeLocalBF 把判别本地账户地址的BF写成文件
//
//	@receiver pool
//	@param wg
//	@return error
func (pool *LocalTxPool) storeLocalBF(wg *sync.WaitGroup) error {
	defer wg.Done()

	if pool.localBF == nil {
		return nil
	}

	_, err := pool.localBF.WriteFile(pool.localBFName)
	if err != nil {
		logger.Error(map[string]interface{}{"[txpool] [store Local BF] write file": err})
	}

	return err
}

// storeLocalQueued 把本地交易池的queued中的交易写进数据库
//
//	@receiver pool
//	@param wg
//	@return error
func (pool *LocalTxPool) storeLocalQueued(wg *sync.WaitGroup) error {
	defer wg.Done()

	var err error
	for _, txs := range localTxPool.queued {
		for _, tx := range txs.GetTxs() {
			tempErr := dao.AddLocalQueuedByTxData(*tx)
			if tempErr != nil {
				logger.Error(map[string]interface{}{"[txpool] [store Local Queued] dao.AddLocalQueuedByTxData()": err})
				err = tempErr
			}
		}
	}

	return err
}
